package org.zjvis.datascience.service.dag;

import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.client.ResourceAccessException;
import org.zjvis.datascience.common.constant.Constant;
import org.zjvis.datascience.common.dto.TaskInstanceDTO;
import org.zjvis.datascience.common.enums.JobStatus;
import org.zjvis.datascience.common.util.RestTemplateUtil;
import org.zjvis.datascience.common.vo.JobStatusVO;
import org.zjvis.datascience.service.TaskInstanceService;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Callable;

/**
 * @description Pyspark任务调度器
 * @date 2021-12-24
 */
@Deprecated
public class PySparkSubmitRunner implements Callable<TaskRunnerResult> {

    private final static Logger logger = LoggerFactory.getLogger(PySparkSubmitRunner.class);

    private String errorTpl = "{\"status\":500, \"error_msg\":\"%s\"}";
    private String emptyTpl = "{\"status\":0, \"error_msg\":\"%s\"}";

    private int maxRetryTimes = 1;

    private int sleepTime = 20000;

    private TaskInstanceDTO instance;

    private RestTemplateUtil restTemplateUtil;

    private Map<String, TaskInstanceDTO> appIdInstanceMap;

    private TaskInstanceService taskInstanceService;


    private String appResourcePath = "";


    public PySparkSubmitRunner(RestTemplateUtil restTemplateUtil, TaskInstanceDTO instance
            , String appResourcePath, Map<String, TaskInstanceDTO> appIdInstanceMap,
                               TaskInstanceService taskInstanceService) {
        this.restTemplateUtil = restTemplateUtil;
        this.instance = instance;
        this.appResourcePath = appResourcePath;
        this.appIdInstanceMap = appIdInstanceMap;
        this.taskInstanceService = taskInstanceService;
    }

    @Override
    public TaskRunnerResult call() throws Exception {
        if (instance.hasPrecautionaryError()) {
            return new TaskRunnerResult(500, String.format(errorTpl, "error happens when init stage."));
        }
        int index = 0;
        TaskRunnerResult result = null;
        while (index < maxRetryTimes) {
            try {
                result = this.submit();
                if (result.getStatus() == 0) {
                    break;
                }
                ++index;
            } catch (ResourceAccessException e) {
                result = new TaskRunnerResult(0,
                        String.format(emptyTpl, "job probably not related to spark. Or there are some problem about the connection."));
                ++index;
            } catch (Exception e) {
                ++index;
                Thread.sleep(sleepTime);
                result = new TaskRunnerResult(500,
                        String.format(errorTpl, e.getMessage().replaceAll("\"", "'")));
            }


        }
        return result;
    }

    public TaskRunnerResult submit() {
        String appArgs = instance.getSqlText();
        //在最后拼上taskIns的id参数，算子需要该参数将输出元数据写入mysql
        if (StringUtils.isEmpty(appArgs)) {
            appArgs = Long.toString(instance.getId());
        } else {
            appArgs = appArgs + " " + instance.getId();
        }
        String applicationId = null;
        try {
            applicationId = restTemplateUtil.submitPySparkJob(appArgs, appResourcePath);//REMOVE
        } catch (IOException e) {
            return new TaskRunnerResult(0, String.format(emptyTpl, "job probably not related to spark. Or there are some problem about the connection."));
        }

        if (StringUtils.isEmpty(applicationId)) {
            return new TaskRunnerResult(500, String.format(errorTpl, "job submit fail"));
        }
        instance.setApplicationId(applicationId);
        taskInstanceService.update(instance);
        appIdInstanceMap.put(applicationId, instance);

        String output = "";
        int status = 0;
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {

            }
            JobStatusVO jobStatusVO = restTemplateUtil
                    .queryJobStatus(applicationId, instance.getLogInfo());

            if (null == jobStatusVO || StringUtils.isEmpty(jobStatusVO.getState())) {
                continue;
            }
            //未结束的任务跳过
            if (!JobStatus.jobIsEnd(jobStatusVO.getState())) {
                continue;
            }
            //成功后更新
            if (JobStatus.SUCCEEDED.toString().equals(jobStatusVO.getFinalStatus())) {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                TaskInstanceDTO instanceNew = taskInstanceService.queryById(instance.getId());
                if (StringUtils.isNotEmpty(instanceNew.getLogInfo())) {
                    output = instanceNew.getLogInfo();
                    instance.setLogInfo(output);
                    status = JSONObject.parseObject(output).getInteger("status");
                }
                instance.setProgress(100);
                appIdInstanceMap.remove(applicationId);
                break;
                //失败后更新
            } else if (JobStatus.FAILED.toString().equals(jobStatusVO.getFinalStatus())) {
                status = 500;
                output = String.format(Constant.errorTpl, jobStatusVO.getDiagnostics()
                        .replaceAll("\"", "'"));
                instance.setProgress(100);
                appIdInstanceMap.remove(applicationId);
                break;
                //停止后更新
            } else if (JobStatus.KILLED.toString().equals(jobStatusVO.getFinalStatus())) {
                status = 500;
                output = String.format(Constant.errorTpl, jobStatusVO.getDiagnostics()
                        .replaceAll("\"", "'"));
                instance.setProgress(100);
                appIdInstanceMap.remove(applicationId);
                break;
            }
        }

        return new TaskRunnerResult(status, output);
    }
}
